/*
 *  Copyright 1999-2019 Seata.io Group.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package io.seata.rm.datasource;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.config.ConfigurationFactory;
import io.seata.core.model.BranchStatus;
import io.seata.rm.datasource.undo.UndoLogManager;
import io.seata.rm.datasource.undo.UndoLogManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.seata.core.constants.ConfigurationKeys.CLIENT_ASYNC_COMMIT_BUFFER_LIMIT;
import static io.seata.common.DefaultValues.DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT;

/**
 * The type Async worker.
 *
 * @author sharajava
 */
public class AsyncWorker {

    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWorker.class);

    private static final int DEFAULT_RESOURCE_SIZE = 16;

    private static final int UNDOLOG_DELETE_LIMIT_SIZE = 1000;

    /** 获取配置项“client.rm.asyncCommitBufferLimit”，默认值为：10000 */
    private static final int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt(
        CLIENT_ASYNC_COMMIT_BUFFER_LIMIT, DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT);

    private final DataSourceManager dataSourceManager;

    private final BlockingQueue<Phase2Context> commitQueue;

    private final ScheduledExecutorService scheduledExecutor;

    public AsyncWorker(DataSourceManager dataSourceManager) {
        this.dataSourceManager = dataSourceManager;

        LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
        commitQueue = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);

        ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);
        scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory);
        scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS); // 每1秒执行1次
    }

    /**
     * 分支事务提交
     */
    public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
        /** 创建二阶段上下文 */
        Phase2Context context = new Phase2Context(xid, branchId, resourceId);

        /** 将二阶段上下文添加到提交队列中 */
        addToCommitQueue(context);

        /** 返回二阶段已经提交状态 */
        return BranchStatus.PhaseTwo_Committed;
    }

    /**
     * 将二阶段上下文添加到提交队列，步骤如下：
     * 1> commitQueue默认长度空间是10000。如果队列有空间，则添加context成功后，直接return返回；如果队列已经满了，则执行步骤2；
     * 2> 异步在线程池scheduledExecutor中执行doBranchCommitSafely(...)方法，用于将commitQueue中对应的提交事务的undolog执行分批批量删除操作，并清空commitQueue。
     * 3> 执行完毕之后，再将刚刚未添加到commitQueue队列的context再次执行addToCommitQueue(...)方法添加到commitQueue中。
     * ------------------------------------------------------------------------------------------------------------
     * 那么，如果commitQueue没有满，是不是里面的Phase2Context集合都没法被执行了呢？
     * 答：不是这样的。因为scheduledExecutor是每秒执行1次的定时任务线程池，也就是说，每过1秒，都会执行doBranchCommitSafely方法的，因为它的定义是：
     * scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS)
     *
     */
    private void addToCommitQueue(Phase2Context context) {
        /** 如果添加到提交队列成功，则直接返回；默认队列长度为10000，如果满了，触发会执行批量删除undolog操作； */
        if (commitQueue.offer(context)) {
            return;
        }

        /** 如果添加到提交队列失败，触发批量删除undolog操作，执行完毕后，然后再将context重试添加到CommitQueue中 */
        CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor).thenRun(() -> addToCommitQueue(context));
    }

    /**
     * 安全的执行二阶段提交，即：批量删除事务日志
     */
    void doBranchCommitSafely() {
        try {
            /** 执行分支提交操作 */
            doBranchCommit();
        } catch (Throwable e) {
            LOGGER.error("Exception occur when doing branch commit", e);
        }
    }

    /**
     * 分支提交
     */
    private void doBranchCommit() {
        if (commitQueue.isEmpty()) {
            return;
        }

        /** 将commitQueue中的二阶段上下文都转移到allContexts中 */
        List<Phase2Context> allContexts = new LinkedList<>();
        commitQueue.drainTo(allContexts); // drainTo是批量获取队列中的元素，为空不阻塞

        /** 安装资源Id（resourceId）进行分组。即：key=resourceId value=List<Phase2Context> */
        Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);

        /** 按组去处理二阶段上下文 */
        groupedContexts.forEach(this::dealWithGroupedContexts);
    }

    /**
     * 根据资源ID（resourceId）对二阶上下文（Phase2Context）进行分组
     */
    Map<String, List<Phase2Context>> groupedByResourceId(List<Phase2Context> contexts) {
        Map<String, List<Phase2Context>> groupedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
        contexts.forEach(context -> {
            List<Phase2Context> group = groupedContexts.computeIfAbsent(context.resourceId, key -> new LinkedList<>());
            group.add(context);
        });
        return groupedContexts;
    }

    /**
     * 按组去处理二阶段上下文
     *
     * @param resourceId    表示该组的统一resourceId
     * @param contexts      表示改组内的二阶段上下文集合
     */
    private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
        /** 步骤1：根据资源id（resourceId）去数据源缓存中获取数据源代理对象（dataSourceProxy），如果取不到，直接返回 */
        DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
        if (dataSourceProxy == null) {
            LOGGER.warn("Failed to find resource for {}", resourceId);
            return;
        }

        /** 步骤2：获得普通的数据库连接 */
        Connection conn;
        try {
            conn = dataSourceProxy.getPlainConnection();
        } catch (SQLException sqle) {
            LOGGER.error("Failed to get connection for async committing on {}", resourceId, sqle);
            return;
        }

        /** 步骤3：根据数据库类型（dbType），获得undoLog管理器（undoLogManager） */
        UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());

        /** 步骤4：把二阶段上下文大列表拆分为多个小的分区列表，目的是防止列表过大，造成拼接出的SQL语句过长。默认分区列表中1000个记录 */
        List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);

        /** 步骤5：对分区列表进行遍历，调用deleteUndoLog方法区删除一批分支事务的日志 */
        splitByLimit.forEach(partition -> deleteUndoLog(conn, undoLogManager, partition));
    }

    /**
     * 删除一批分支事务的日志
     */
    private void deleteUndoLog(Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {
        /** 首先，保存所有事务id集合到xids中；保存所有的分支id到branchIds中 */
        Set<String> xids = new LinkedHashSet<>(contexts.size());
        Set<Long> branchIds = new LinkedHashSet<>(contexts.size());
        contexts.forEach(context -> {
            xids.add(context.xid);
            branchIds.add(context.branchId);
        });

        /** 其次，利用undoLogManager执行批量删除undolog操作 */
        try {
            undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
            if (!conn.getAutoCommit()) {
                conn.commit(); // 提交本地事务
            }
        } catch (SQLException e) {
            LOGGER.error("Failed to batch delete undo log", e);
            try {
                conn.rollback(); // 回滚本地事务
            } catch (SQLException rollbackEx) {
                LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);
            }
        } finally {
            try {
                conn.close(); // 关闭数据库连接
            } catch (SQLException closeEx) {
                LOGGER.error("Failed to close JDBC resource after deleting undo log", closeEx);
            }
        }
    }

    /**
     * 二阶段上下文
     */
    static class Phase2Context {

        /** 全局事务id */
        String xid;

        /** 分支id */
        long branchId;

        /** 资源id */
        String resourceId;

        public Phase2Context(String xid, long branchId, String resourceId) {
            this.xid = xid;
            this.branchId = branchId;
            this.resourceId = resourceId;
        }
    }
}
